Followable Apps Documentation
Table of Contents
- What is a Followable App?
 - When to Use Followable Apps
 - Where to Use Followable Apps
 - How to Use Followable Apps
 - Examples
 - Best Practices
 - Troubleshooting
 
What is a Followable App?
A Followable App is a specialized type of Corva backend application that can be followed by other applications, creating a chain reaction of app executions. When a followable app produces data, it automatically triggers the execution of any apps that are configured to follow it.
Key Characteristics:
- Data Producer: Followable apps must produce data to trigger following apps
 - Chain Reaction: They enable cascading workflows where one app's output becomes another app's input
 - App Type Restrictions: Only 
streamandscheduledapps can be made followable - Real-time Processing: They facilitate real-time data processing pipelines
 
Core Concept:
Followable App → Produces Data → Triggers Following Apps → Chain Reaction
The followable app architecture enables building complex data processing workflows where apps can depend on the output of other apps, creating sophisticated real-time analytics pipelines.
When to Use Followable Apps
Primary Use Cases:
Multi-Stage Data Processing
- When you need to process data through multiple computational stages
 - Each stage depends on the output of the previous stage
 - Example: Raw sensor data → Cleaned data → Analytics → Alerts
 
Real-time Analytics Pipelines
- When building complex analytics workflows
 - Multiple apps need to react to the same data events
 - Real-time calculations that depend on processed results
 
Event-Driven Architectures
- When you want to trigger multiple processes from a single data event
 - Decoupling data producers from data consumers
 - Building reactive systems that respond to data changes
 
Data Enrichment Workflows
- When raw data needs multiple enrichment steps
 - Each enrichment step can be handled by a specialized app
 - Final enriched data is used by downstream applications
 
Scenarios Where Followable Apps Excel:
- Drilling Operations: Raw drilling data → Processed parameters → Alerts/Recommendations
 - Well Performance: Production data → Calculations → Performance metrics → Optimization suggestions
 - Equipment Monitoring: Sensor data → Health indicators → Predictive maintenance alerts
 - Quality Control: Raw measurements → Statistical analysis → Quality scores → Reports
 
Where to Use Followable Apps
Platform Context:
Followable apps operate within the Corva DevCenter ecosystem and are built using the Corva Python SDK.
Infrastructure Requirements:
Corva DevCenter Account
- Backend app development environment
 - Access to datasets and APIs
 - App deployment and management tools
 
Data Sources
- Real-time data streams (for stream apps)
 - Scheduled data intervals (for scheduled apps)
 - Proper data ingestion setup
 
Datasets
- Write permissions to target datasets
 - Proper dataset configuration for data storage
 - Understanding of data schema requirements
 
Architecture Placement:
Data Sources → Stream/Scheduled Apps (Followable) → Following Apps → End Users/Systems
Integration Points:
- Upstream: Data ingestion systems, sensors, databases
 - Downstream: Visualization tools, alert systems, reporting applications
 - Lateral: Other Corva apps, external APIs, notification systems
 
How to Use Followable Apps
Step 1: Create a Followable App
Basic Structure:
from corva import Api, Cache, ScheduledDataTimeEvent, scheduled
@scheduled
def followable_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    # Process your data
    processed_data = process_raw_data(event)
    
    # Prepare data for storage and following apps
    data = [
        {
            'asset_id': event.asset_id,
            'version': 1,
            'timestamp': processed_data['timestamp'],
            'data': processed_data['results'],
        }
    ]
    
    # Store data and trigger following apps (recommended method)
    api.insert_data(
        provider='your-provider',
        dataset='your-dataset',
        data=data,
        produce=True  # This flag enables following apps to be triggered
    )
    
    return "Data processed and published successfully"
Step 2: Configure the App as Followable
In the Corva DevCenter:
- Open your app page
 - Navigate to Settings tab
 - Activate "Followable App" toggle
 - Select a dataset (must have write permissions)
 - Save configuration
 
Step 3: Create Following Apps
Scheduled Following App:
from corva import Api, Cache, ScheduledDataTimeEvent, scheduled
@scheduled
def following_scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    # Query data from the followable app's dataset
    data = api.get_dataset(
        provider='your-provider',
        dataset='followable-app-dataset',
        query={
            'asset_id': event.asset_id, 
            'company_id': event.company_id
        },
        sort={'timestamp': 1},
        limit=100,
    )
    
    # Process the data
    results = analyze_followable_data(data)
    
    # Store results or trigger additional actions
    return results
Stream Following App:
from corva import Api, Cache, StreamTimeEvent, stream
@stream
def following_stream_app(event: StreamTimeEvent, api: Api, cache: Cache):
    # Data is automatically provided in the event
    for record in event.records:
        # Process each data record from the followable app
        process_followable_record(record.data)
    
    return "Stream processing completed"
Step 4: Configure Following Relationships
In the Corva DevCenter when creating a following app:
- Choose same Segment and Log Type as the followable app
 - Select the followable app from "Select the data that your app will follow"
 - Complete app creation
 
Data Production Methods:
Method 1: Separate API Calls (Slower)
# First, insert data
api.insert_data(
    provider='my-provider',
    dataset='my-dataset',
    data=data,
)
# Then, produce messages
api.produce_messages(data=data)
Method 2: Combined API Call (Recommended)
# Insert data and produce messages in one call
api.insert_data(
    provider='my-provider',
    dataset='my-dataset',
    data=data,
    produce=True  # Enable message production
)
Examples
Example 1: Drilling Data Processing Pipeline
Followable App - Raw Data Processor:
from corva import Api, Cache, ScheduledDataTimeEvent, scheduled
@scheduled
def drilling_data_processor(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    """
    Processes raw drilling data and calculates basic parameters
    """
    # Get raw drilling data
    raw_data = get_raw_drilling_data(event.asset_id, event.start_time, event.end_time)
    
    processed_records = []
    for record in raw_data:
        # Calculate drilling parameters
        processed_record = {
            'asset_id': event.asset_id,
            'version': 1,
            'timestamp': record['timestamp'],
            'data': {
                'depth': record['depth'],
                'rop': calculate_rop(record),
                'wob': record['weight_on_bit'],
                'rpm': record['rotary_speed'],
                'torque': record['torque'],
                'flow_rate': record['mud_flow_rate'],
                'processed_at': int(time.time())
            }
        }
        processed_records.append(processed_record)
    
    # Store processed data and trigger following apps
    api.insert_data(
        provider='drilling-analytics',
        dataset='processed-drilling-data',
        data=processed_records,
        produce=True
    )
    
    return f"Processed {len(processed_records)} drilling records"
Following App - Performance Analyzer:
from corva import Api, Cache, ScheduledDataTimeEvent, scheduled
@scheduled
def drilling_performance_analyzer(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    """
    Analyzes processed drilling data for performance metrics
    """
    # Get processed data from the followable app
    processed_data = api.get_dataset(
        provider='drilling-analytics',
        dataset='processed-drilling-data',
        query={
            'asset_id': event.asset_id,
            'company_id': event.company_id,
            'timestamp': {'$gte': event.start_time, '$lte': event.end_time}
        },
        sort={'timestamp': 1}
    )
    
    if not processed_data:
        return "No processed data available"
    
    # Calculate performance metrics
    performance_metrics = []
    for record in processed_data:
        data = record['data']
        
        # Calculate drilling efficiency
        efficiency_score = calculate_drilling_efficiency(
            data['rop'], data['wob'], data['rpm'], data['torque']
        )
        
        # Detect anomalies
        anomalies = detect_drilling_anomalies(data)
        
        performance_record = {
            'asset_id': event.asset_id,
            'version': 1,
            'timestamp': record['timestamp'],
            'data': {
                'efficiency_score': efficiency_score,
                'anomalies': anomalies,
                'recommendations': generate_recommendations(data, efficiency_score),
                'analyzed_at': int(time.time())
            }
        }
        performance_metrics.append(performance_record)
    
    # Store performance analysis
    api.insert_data(
        provider='drilling-analytics',
        dataset='drilling-performance',
        data=performance_metrics,
        produce=True  # This could trigger alert systems
    )
    
    return f"Analyzed performance for {len(performance_metrics)} records"
Example 2: Real-time Well Monitoring
Followable App - Sensor Data Aggregator:
from corva import Api, Cache, StreamTimeEvent, stream
@stream
def sensor_data_aggregator(event: StreamTimeEvent, api: Api, cache: Cache):
    """
    Aggregates and validates real-time sensor data
    """
    aggregated_data = []
    
    for record in event.records:
        # Validate and clean sensor data
        if validate_sensor_data(record.data):
            aggregated_record = {
                'asset_id': event.asset_id,
                'version': 1,
                'timestamp': record.timestamp,
                'data': {
                    'pressure': record.data.get('pressure'),
                    'temperature': record.data.get('temperature'),
                    'flow_rate': record.data.get('flow_rate'),
                    'vibration': record.data.get('vibration'),
                    'quality_score': calculate_data_quality(record.data),
                    'aggregated_at': int(time.time())
                }
            }
            aggregated_data.append(aggregated_record)
    
    if aggregated_data:
        # Store aggregated data and trigger following apps
        api.insert_data(
            provider='well-monitoring',
            dataset='aggregated-sensor-data',
            data=aggregated_data,
            produce=True
        )
    
    return f"Aggregated {len(aggregated_data)} sensor records"
Following App - Alert System:
from corva import Api, Cache, StreamTimeEvent, stream
@stream
def well_alert_system(event: StreamTimeEvent, api: Api, cache: Cache):
    """
    Monitors aggregated data for alert conditions
    """
    alerts_generated = []
    
    for record in event.records:
        data = record.data
        
        # Check for alert conditions
        alerts = []
        
        if data['pressure'] > PRESSURE_THRESHOLD:
            alerts.append({
                'type': 'HIGH_PRESSURE',
                'severity': 'CRITICAL',
                'message': f"Pressure {data['pressure']} exceeds threshold {PRESSURE_THRESHOLD}"
            })
        
        if data['temperature'] > TEMPERATURE_THRESHOLD:
            alerts.append({
                'type': 'HIGH_TEMPERATURE',
                'severity': 'WARNING',
                'message': f"Temperature {data['temperature']} exceeds threshold {TEMPERATURE_THRESHOLD}"
            })
        
        if data['quality_score'] < QUALITY_THRESHOLD:
            alerts.append({
                'type': 'POOR_DATA_QUALITY',
                'severity': 'INFO',
                'message': f"Data quality score {data['quality_score']} below threshold"
            })
        
        if alerts:
            alert_record = {
                'asset_id': event.asset_id,
                'version': 1,
                'timestamp': record.timestamp,
                'data': {
                    'alerts': alerts,
                    'sensor_data': data,
                    'generated_at': int(time.time())
                }
            }
            alerts_generated.append(alert_record)
    
    if alerts_generated:
        # Store alerts
        api.insert_data(
            provider='well-monitoring',
            dataset='well-alerts',
            data=alerts_generated,
            produce=True  # Could trigger notification systems
        )
        
        # Send immediate notifications for critical alerts
        for alert_record in alerts_generated:
            for alert in alert_record['data']['alerts']:
                if alert['severity'] == 'CRITICAL':
                    send_immediate_notification(alert, event.asset_id)
    
    return f"Generated {len(alerts_generated)} alert records"
Best Practices
1. Data Design
- Consistent Schema: Ensure data structure consistency across followable and following apps
 - Versioning: Use version fields to handle schema evolution
 - Timestamps: Always include accurate timestamps for proper event ordering
 - Asset Context: Include asset_id and company_id for proper data isolation
 
2. Performance Optimization
- Batch Processing: Process data in batches when possible
 - Efficient Queries: Use proper indexing and query optimization
 - Data Size Management: Avoid producing excessively large data payloads
 - Caching Strategy: Implement caching for frequently accessed data
 
3. Error Handling
@scheduled
def robust_followable_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    try:
        # Main processing logic
        data = process_data(event)
        
        # Validate data before publishing
        if validate_data(data):
            api.insert_data(
                provider='your-provider',
                dataset='your-dataset',
                data=data,
                produce=True
            )
            return "Success"
        else:
            raise ValueError("Data validation failed")
            
    except Exception as e:
        # Log error for debugging
        api.log_error(f"Followable app error: {str(e)}")
        
        # Optionally, publish error information
        error_data = [{
            'asset_id': event.asset_id,
            'version': 1,
            'timestamp': int(time.time()),
            'data': {
                'error': str(e),
                'event_info': str(event)
            }
        }]
        
        api.insert_data(
            provider='your-provider',
            dataset='error-log',
            data=error_data
        )
        
        raise  # Re-raise to ensure proper error handling
4. Monitoring and Logging
@scheduled
def monitored_followable_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
    start_time = time.time()
    
    try:
        # Log app start
        api.log_info(f"Starting followable app for asset {event.asset_id}")
        
        # Process data
        data = process_data(event)
        
        # Log processing metrics
        processing_time = time.time() - start_time
        api.log_info(f"Processed {len(data)} records in {processing_time:.2f} seconds")
        
        # Publish data
        api.insert_data(
            provider='your-provider',
            dataset='your-dataset',
            data=data,
            produce=True
        )
        
        # Log success
        api.log_info(f"Successfully published data for {len(data)} records")
        
        return f"Processed {len(data)} records"
        
    except Exception as e:
        # Log error with context
        api.log_error(f"Followable app failed: {str(e)}", extra={
            'asset_id': event.asset_id,
            'event_type': type(event).__name__,
            'processing_time': time.time() - start_time
        })
        raise
Troubleshooting
Common Issues and Solutions
1. Following Apps Not Triggering
Symptoms:
- Followable app runs successfully
 - Data is stored in dataset
 - Following apps don't execute
 
Solutions:
# Ensure produce=True is set
api.insert_data(
    provider='your-provider',
    dataset='your-dataset',
    data=data,
    produce=True  # This is crucial!
)
# Or use separate produce_messages call
api.insert_data(provider='your-provider', dataset='your-dataset', data=data)
api.produce_messages(data=data)
Checklist:
-  
produce=Trueflag is set ininsert_data() - Following apps have correct segment and log type
 - Followable app configuration is properly saved
 - Dataset has correct write permissions
 
2. Data Schema Mismatches
Symptoms:
- Following apps receive unexpected data structure
 - Processing errors in following apps
 
Solutions:
# Standardize data structure
def standardize_data_format(raw_data, asset_id):
    return {
        'asset_id': asset_id,
        'version': 1,  # Always include version
        'timestamp': int(raw_data.get('timestamp', time.time())),
        'data': {
            # Your actual data here
            'value1': raw_data.get('value1'),
            'value2': raw_data.get('value2'),
            # Include metadata
            'source': 'followable_app_name',
            'processed_at': int(time.time())
        }
    }
3. Performance Issues
Symptoms:
- Slow app execution
 - Following apps timing out
 - Large data volumes causing issues
 
Solutions:
# Implement batching
def process_in_batches(data, batch_size=100):
    for i in range(0, len(data), batch_size):
        batch = data[i:i + batch_size]
        yield batch
# Process and publish in batches
for batch in process_in_batches(large_dataset):
    processed_batch = process_batch(batch)
    api.insert_data(
        provider='your-provider',
        dataset='your-dataset',
        data=processed_batch,
        produce=True
    )
4. Debugging Following Relationships
Debug Steps:
Verify App Configuration:
# In following app, log received data
api.log_info(f"Received {len(event.records)} records from followable app")
for record in event.records:
api.log_debug(f"Record data: {record.data}")Check Dataset Contents:
# Query the dataset directly
stored_data = api.get_dataset(
provider='your-provider',
dataset='your-dataset',
query={'asset_id': event.asset_id},
sort={'timestamp': -1},
limit=10
)
api.log_info(f"Found {len(stored_data)} records in dataset")Validate Event Flow:
# Add comprehensive logging
@stream
def debug_following_app(event: StreamTimeEvent, api: Api, cache: Cache):
api.log_info(f"Following app triggered with {len(event.records)} records")
api.log_info(f"Event asset_id: {event.asset_id}")
api.log_info(f"Event timestamp range: {event.start_time} to {event.end_time}")
for i, record in enumerate(event.records):
api.log_debug(f"Record {i}: {record.data}")
Error Messages and Solutions
| Error Message | Cause | Solution | 
|---|---|---|
| "No following apps configured" | App not marked as followable | Enable followable toggle in DevCenter | 
| "Dataset write permission denied" | Insufficient permissions | Check dataset permissions in settings | 
| "Invalid data format" | Malformed data structure | Validate data schema before publishing | 
| "Following app timeout" | Processing takes too long | Optimize following app logic, implement batching | 
| "Circular dependency detected" | App following creates loop | Review app following relationships | 
Performance Monitoring
import time
from functools import wraps
def monitor_performance(func):
    @wraps(func)
    def wrapper(event, api, cache):
        start_time = time.time()
        
        try:
            result = func(event, api, cache)
            duration = time.time() - start_time
            
            # Log performance metrics
            api.log_info(f"{func.__name__} completed in {duration:.2f}s")
            
            return result
            
        except Exception as e:
            duration = time.time() - start_time
            api.log_error(f"{func.__name__} failed after {duration:.2f}s: {str(e)}")
            raise
            
    return wrapper
@monitor_performance
@scheduled
def monitored_followable_app(event, api, cache):
    # Your app logic here
    pass
Conclusion
Followable Apps are a powerful feature in the Corva platform that enable the creation of sophisticated, event-driven data processing pipelines. By understanding the concepts, best practices, and implementation patterns outlined in this documentation, you can build robust and scalable applications that leverage the full potential of the Corva ecosystem.
Key Takeaways:
- Chain Reactions: Followable apps create automated workflows where data flows seamlessly between applications
 - Real-time Processing: Enable real-time analytics and immediate response to data events
 - Modular Architecture: Break complex processing into specialized, maintainable components
 - Scalable Design: Handle large volumes of data through efficient processing patterns
 
Next Steps:
- Review the Corva Python SDK documentation for additional details
 - Explore the Corva DevCenter for platform-specific guidance
 - Start with simple followable app examples before building complex workflows
 - Implement comprehensive testing and monitoring for production deployments
 
For additional support and examples, visit the Corva DevCenter Documentation.